package resource

import (
	"context"
	"encoding/json"
	"fmt"
	"log/slog"
	"net/http"
	"sync"
	"sync/atomic"
	"time"

	"github.com/google/uuid"
	"github.com/prometheus/client_golang/prometheus"
	"go.opentelemetry.io/otel/trace"
	"go.opentelemetry.io/otel/trace/noop"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

	claims "github.com/grafana/authlib/types"
	"github.com/grafana/dskit/backoff"

	"github.com/grafana/grafana/pkg/apimachinery/utils"
	"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
	"github.com/grafana/grafana/pkg/util/scheduler"
)

const (
	// DefaultMaxBackoff is the default maximum backoff duration for enqueue operations.
	DefaultMaxBackoff = 1 * time.Second
	// DefaultMinBackoff is the default minimum backoff duration for enqueue operations.
	DefaultMinBackoff = 100 * time.Millisecond
	// DefaultMaxRetries is the default maximum number of retries for enqueue operations.
	DefaultMaxRetries = 3
)

// ResourceServer implements all gRPC services
type ResourceServer interface {
	resourcepb.ResourceStoreServer
	resourcepb.BulkStoreServer
	resourcepb.ResourceIndexServer
	resourcepb.ManagedObjectIndexServer
	resourcepb.BlobStoreServer
	resourcepb.DiagnosticsServer
}

type ListIterator interface {
	// Next advances iterator and returns true if there is next value is available from the iterator.
	// Error() should be checked after every call of Next(), even when Next() returns true.
	Next() bool // sql.Rows

	// Error returns iterator error, if any. This should be checked after any Next() call.
	// (Some iterator implementations return true from Next, but also set the error at the same time).
	Error() error

	// ContinueToken returns the token that can be used to start iterating *after* this item
	ContinueToken() string

	// ResourceVersion of the current item
	ResourceVersion() int64

	// Namespace of the current item
	// Used for fast(er) authz filtering
	Namespace() string

	// Name of the current item
	// Used for fast(er) authz filtering
	Name() string

	// Folder of the current item
	// Used for fast(er) authz filtering
	Folder() string

	// Value for the current item
	Value() []byte
}

type BackendReadResponse struct {
	// Metadata
	Key    *resourcepb.ResourceKey
	Folder string

	// GUID that is used internally
	GUID string
	// The new resource version
	ResourceVersion int64
	// The properties
	Value []byte
	// Error details
	Error *resourcepb.ErrorResult
}

// The StorageBackend is an internal abstraction that supports interacting with
// the underlying raw storage medium.  This interface is never exposed directly,
// it is provided by concrete instances that actually write values.
type StorageBackend interface {
	// Write a Create/Update/Delete,
	// NOTE: the contents of WriteEvent have been validated
	// Return the revisionVersion for this event or error
	WriteEvent(context.Context, WriteEvent) (int64, error)

	// Read a resource from storage optionally at an explicit version
	ReadResource(context.Context, *resourcepb.ReadRequest) *BackendReadResponse

	// When the ResourceServer executes a List request, this iterator will
	// query the backend for potential results.  All results will be
	// checked against the kubernetes requirements before finally returning
	// results.  The list options can be used to improve performance
	// but are the the final answer.
	ListIterator(context.Context, *resourcepb.ListRequest, func(ListIterator) error) (int64, error)

	// ListHistory is like ListIterator, but it returns the history of a resource
	ListHistory(context.Context, *resourcepb.ListRequest, func(ListIterator) error) (int64, error)

	// Get all events from the store
	// For HA setups, this will be more events than the local WriteEvent above!
	WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error)

	// Get resource stats within the storage backend.  When namespace is empty, it will apply to all
	GetResourceStats(ctx context.Context, namespace string, minCount int) ([]ResourceStats, error)
}

type ResourceStats struct {
	NamespacedResource

	Count           int64
	ResourceVersion int64
}

// This interface is not exposed to end users directly
// Access to this interface is already gated by access control
type BlobSupport interface {
	// Indicates if storage layer supports signed urls
	SupportsSignedURLs() bool

	// Get the raw blob bytes and metadata -- limited to protobuf message size
	// For larger payloads, we should use presigned URLs to upload from the client
	PutResourceBlob(context.Context, *resourcepb.PutBlobRequest) (*resourcepb.PutBlobResponse, error)

	// Get blob contents.  When possible, this will return a signed URL
	// For large payloads, signed URLs are required to avoid protobuf message size limits
	GetResourceBlob(ctx context.Context, resource *resourcepb.ResourceKey, info *utils.BlobInfo, mustProxy bool) (*resourcepb.GetBlobResponse, error)

	// TODO? List+Delete?  This is for admin access
}

type QOSEnqueuer interface {
	Enqueue(ctx context.Context, tenantID string, runnable func()) error
}

type BlobConfig struct {
	// The CDK configuration URL
	URL string

	// Directly implemented blob support
	Backend BlobSupport
}

// Passed as input to the constructor
type SearchOptions struct {
	// The raw index backend (eg, bleve, frames, parquet, etc)
	Backend SearchBackend

	// The supported resource types
	Resources DocumentBuilderSupplier

	// How many threads should build indexes
	WorkerThreads int

	// Skip building index on startup for small indexes
	InitMinCount int

	// Build empty index on startup for large indexes so that
	// we don't re-attempt to build the index later.
	InitMaxCount int

	// Channel to watch for index events (for testing)
	IndexEventsChan chan *IndexEvent

	// Interval for periodic index rebuilds (0 disables periodic rebuilds)
	RebuildInterval time.Duration
}

type ResourceServerOptions struct {
	// OTel tracer
	Tracer trace.Tracer

	// Real storage backend
	Backend StorageBackend

	// The blob configuration
	Blob BlobConfig

	// Search options
	Search SearchOptions

	// Diagnostics
	Diagnostics resourcepb.DiagnosticsServer

	// Check if a user has access to write folders
	// When this is nil, no resources can have folders configured
	WriteHooks WriteAccessHooks

	// Link RBAC
	AccessClient claims.AccessClient

	// Callbacks for startup and shutdown
	Lifecycle LifecycleHooks

	// Get the current time in unix millis
	Now func() int64

	// Registerer to register prometheus Metrics for the Resource server
	Reg prometheus.Registerer

	storageMetrics *StorageMetrics

	IndexMetrics *BleveIndexMetrics

	// MaxPageSizeBytes is the maximum size of a page in bytes.
	MaxPageSizeBytes int

	// QOSQueue is the quality of service queue used to enqueue
	QOSQueue QOSEnqueuer
}

func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
	if opts.Tracer == nil {
		opts.Tracer = noop.NewTracerProvider().Tracer("resource-server")
	}

	if opts.Backend == nil {
		return nil, fmt.Errorf("missing Backend implementation")
	}

	if opts.AccessClient == nil {
		opts.AccessClient = claims.FixedAccessClient(true) // everything OK
	}

	if opts.Diagnostics == nil {
		opts.Diagnostics = &noopService{}
	}

	if opts.Now == nil {
		opts.Now = func() int64 {
			return time.Now().UnixMilli()
		}
	}

	if opts.MaxPageSizeBytes <= 0 {
		// By default, we use 2MB for the page size.
		opts.MaxPageSizeBytes = 1024 * 1024 * 2
	}

	if opts.QOSQueue == nil {
		opts.QOSQueue = scheduler.NewNoopQueue()
	}

	// Initialize the blob storage
	blobstore := opts.Blob.Backend
	if blobstore == nil {
		if opts.Blob.URL != "" {
			ctx := context.Background()
			bucket, err := OpenBlobBucket(ctx, opts.Blob.URL)
			if err != nil {
				return nil, err
			}

			blobstore, err = NewCDKBlobSupport(ctx, CDKBlobSupportOptions{
				Tracer: opts.Tracer,
				Bucket: NewInstrumentedBucket(bucket, opts.Reg, opts.Tracer),
			})
			if err != nil {
				return nil, err
			}
		} else {
			// Check if the backend supports blob storage
			blobstore, _ = opts.Backend.(BlobSupport)
		}
	}

	logger := slog.Default().With("logger", "resource-server")

	// Make this cancelable
	ctx, cancel := context.WithCancel(context.Background())
	s := &server{
		tracer:           opts.Tracer,
		log:              logger,
		backend:          opts.Backend,
		blob:             blobstore,
		diagnostics:      opts.Diagnostics,
		access:           opts.AccessClient,
		writeHooks:       opts.WriteHooks,
		lifecycle:        opts.Lifecycle,
		now:              opts.Now,
		ctx:              ctx,
		cancel:           cancel,
		storageMetrics:   opts.storageMetrics,
		indexMetrics:     opts.IndexMetrics,
		maxPageSizeBytes: opts.MaxPageSizeBytes,
		reg:              opts.Reg,
		queue:            opts.QOSQueue,
	}

	if opts.Search.Resources != nil {
		var err error
		s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.Tracer, opts.IndexMetrics)
		if err != nil {
			return nil, err
		}
	}

	err := s.Init(ctx)
	if err != nil {
		s.log.Error("resource server init failed", "error", err)
		return nil, err
	}

	return s, nil
}

var _ ResourceServer = &server{}

type server struct {
	tracer         trace.Tracer
	log            *slog.Logger
	backend        StorageBackend
	blob           BlobSupport
	search         *searchSupport
	diagnostics    resourcepb.DiagnosticsServer
	access         claims.AccessClient
	writeHooks     WriteAccessHooks
	lifecycle      LifecycleHooks
	now            func() int64
	mostRecentRV   atomic.Int64 // The most recent resource version seen by the server
	storageMetrics *StorageMetrics
	indexMetrics   *BleveIndexMetrics

	// Background watch task -- this has permissions for everything
	ctx         context.Context
	cancel      context.CancelFunc
	broadcaster Broadcaster[*WrittenEvent]

	// init checking
	once    sync.Once
	initErr error

	maxPageSizeBytes int
	reg              prometheus.Registerer
	queue            QOSEnqueuer
}

// Init implements ResourceServer.
func (s *server) Init(ctx context.Context) error {
	s.once.Do(func() {
		// Call lifecycle hooks
		if s.lifecycle != nil {
			err := s.lifecycle.Init(ctx)
			if err != nil {
				s.initErr = fmt.Errorf("initialize Resource Server: %w", err)
			}
		}

		// initialize the search index
		if s.initErr == nil && s.search != nil {
			s.initErr = s.search.init(ctx)
		}

		// Start watching for changes
		if s.initErr == nil {
			s.initErr = s.initWatcher()
		}

		if s.initErr != nil {
			s.log.Error("error running resource server init", "error", s.initErr)
		}
	})
	return s.initErr
}

func (s *server) Stop(ctx context.Context) error {
	s.initErr = fmt.Errorf("service is stopping")

	var stopFailed bool
	if s.lifecycle != nil {
		err := s.lifecycle.Stop(ctx)
		if err != nil {
			stopFailed = true
			s.initErr = fmt.Errorf("service stopeed with error: %w", err)
		}
	}

	// Stops the streaming
	s.cancel()

	// mark the value as done
	if stopFailed {
		return s.initErr
	}
	s.initErr = fmt.Errorf("service is stopped")

	return nil
}

// Old value indicates an update -- otherwise a create
func (s *server) newEvent(ctx context.Context, user claims.AuthInfo, key *resourcepb.ResourceKey, value, oldValue []byte) (*WriteEvent, *resourcepb.ErrorResult) {
	tmp := &unstructured.Unstructured{}
	err := tmp.UnmarshalJSON(value)
	if err != nil {
		return nil, AsErrorResult(err)
	}
	obj, err := utils.MetaAccessor(tmp)
	if err != nil {
		return nil, AsErrorResult(err)
	}

	if obj.GetUID() == "" {
		// TODO! once https://github.com/grafana/grafana/pull/96086 is deployed everywhere
		// return nil, NewBadRequestError("object is missing UID")
		s.log.Error("object is missing UID", "key", key)
	}

	if obj.GetResourceVersion() != "" {
		s.log.Error("object must not include a resource version", "key", key)
	}

	// Make sure the command labels are not saved
	for k := range obj.GetLabels() {
		if k == utils.LabelKeyGetHistory || k == utils.LabelKeyGetTrash || k == utils.LabelGetFullpath {
			return nil, NewBadRequestError("can not save label: " + k)
		}
	}

	if obj.GetAnnotation(utils.AnnoKeyGrantPermissions) != "" {
		return nil, NewBadRequestError("can not save annotation: " + utils.AnnoKeyGrantPermissions)
	}

	event := &WriteEvent{
		Value:  value,
		Key:    key,
		Object: obj,
		GUID:   uuid.New().String(),
	}

	if oldValue == nil {
		event.Type = resourcepb.WatchEvent_ADDED
	} else {
		event.Type = resourcepb.WatchEvent_MODIFIED

		temp := &unstructured.Unstructured{}
		err = temp.UnmarshalJSON(oldValue)
		if err != nil {
			return nil, AsErrorResult(err)
		}
		event.ObjectOld, err = utils.MetaAccessor(temp)
		if err != nil {
			return nil, AsErrorResult(err)
		}
	}

	if key.Namespace != obj.GetNamespace() {
		return nil, NewBadRequestError("key/namespace do not match")
	}

	gvk := obj.GetGroupVersionKind()
	if gvk.Kind == "" {
		return nil, NewBadRequestError("expecting resources with a kind in the body")
	}
	if gvk.Version == "" {
		return nil, NewBadRequestError("expecting resources with an apiVersion")
	}
	if gvk.Group != "" && gvk.Group != key.Group {
		return nil, NewBadRequestError(
			fmt.Sprintf("group in key does not match group in the body (%s != %s)", key.Group, gvk.Group),
		)
	}

	// This needs to be a create function
	if key.Name == "" {
		if obj.GetName() == "" {
			return nil, NewBadRequestError("missing name")
		}
		key.Name = obj.GetName()
	} else if key.Name != obj.GetName() {
		return nil, NewBadRequestError(
			fmt.Sprintf("key/name do not match (key: %s, name: %s)", key.Name, obj.GetName()))
	}
	if err := validateName(obj.GetName()); err != nil {
		return nil, err
	}

	// For folder moves, we need to check permissions on both folders
	if s.isFolderMove(event) {
		if err := s.checkFolderMovePermissions(ctx, user, key, event.ObjectOld.GetFolder(), obj.GetFolder()); err != nil {
			return nil, err
		}
	} else {
		// Regular permission check for create/update
		check := claims.CheckRequest{
			Verb:      utils.VerbCreate,
			Group:     key.Group,
			Resource:  key.Resource,
			Namespace: key.Namespace,
		}

		if event.Type == resourcepb.WatchEvent_MODIFIED {
			check.Verb = utils.VerbUpdate
			check.Name = key.Name
		}

		check.Folder = obj.GetFolder()
		a, err := s.access.Check(ctx, user, check)
		if err != nil {
			return nil, AsErrorResult(err)
		}
		if !a.Allowed {
			return nil, &resourcepb.ErrorResult{
				Code: http.StatusForbidden,
			}
		}
	}

	m, ok := obj.GetManagerProperties()
	if ok && m.Kind == utils.ManagerKindRepo {
		err = s.writeHooks.CanWriteValueFromRepository(ctx, user, m.Identity)
		if err != nil {
			return nil, AsErrorResult(err)
		}
	}
	return event, nil
}

// isFolderMove determines if an event represents a resource being moved between folders
func (s *server) isFolderMove(event *WriteEvent) bool {
	return event.Type == resourcepb.WatchEvent_MODIFIED &&
		event.ObjectOld != nil &&
		event.ObjectOld.GetFolder() != event.Object.GetFolder()
}

// checkFolderMovePermissions handles permission checks when a resource is being moved between folders
func (s *server) checkFolderMovePermissions(ctx context.Context, user claims.AuthInfo, key *resourcepb.ResourceKey, oldFolder, newFolder string) *resourcepb.ErrorResult {
	// First check if user can update the resource in the original folder
	updateCheck := claims.CheckRequest{
		Verb:      utils.VerbUpdate,
		Group:     key.Group,
		Resource:  key.Resource,
		Namespace: key.Namespace,
		Name:      key.Name,
		Folder:    oldFolder,
	}

	a, err := s.access.Check(ctx, user, updateCheck)
	if err != nil {
		return AsErrorResult(err)
	}
	if !a.Allowed {
		return &resourcepb.ErrorResult{
			Code:    http.StatusForbidden,
			Message: "not allowed to update resource in the source folder",
		}
	}

	// Then check if user can create the resource in the destination folder
	createCheck := claims.CheckRequest{
		Verb:      utils.VerbCreate,
		Group:     key.Group,
		Resource:  key.Resource,
		Namespace: key.Namespace,
		Folder:    newFolder,
	}

	a, err = s.access.Check(ctx, user, createCheck)
	if err != nil {
		return AsErrorResult(err)
	}
	if !a.Allowed {
		return &resourcepb.ErrorResult{
			Code:    http.StatusForbidden,
			Message: "not allowed to create resource in the destination folder",
		}
	}

	return nil
}

func (s *server) Create(ctx context.Context, req *resourcepb.CreateRequest) (*resourcepb.CreateResponse, error) {
	ctx, span := s.tracer.Start(ctx, "storage_server.Create")
	defer span.End()

	rsp := &resourcepb.CreateResponse{}
	user, ok := claims.AuthInfoFrom(ctx)
	if !ok || user == nil {
		rsp.Error = &resourcepb.ErrorResult{
			Message: "no user found in context",
			Code:    http.StatusUnauthorized,
		}
		return rsp, nil
	}

	var (
		res *resourcepb.CreateResponse
		err error
	)
	runErr := s.runInQueue(ctx, req.Key.Namespace, func() {
		res, err = s.create(ctx, user, req)
	})
	if runErr != nil {
		return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.CreateResponse {
			return &resourcepb.CreateResponse{Error: e}
		})
	}

	return res, err
}

func (s *server) create(ctx context.Context, user claims.AuthInfo, req *resourcepb.CreateRequest) (*resourcepb.CreateResponse, error) {
	rsp := &resourcepb.CreateResponse{}

	event, e := s.newEvent(ctx, user, req.Key, req.Value, nil)
	if e != nil {
		rsp.Error = e
		return rsp, nil
	}

	// If the resource already exists, the create will return an already exists error that is remapped appropriately by AsErrorResult.
	// This also benefits from ACID behaviours on our databases, so we avoid race conditions.
	var err error
	rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event)
	if err != nil {
		rsp.Error = AsErrorResult(err)
	}
	s.log.Debug("server.WriteEvent", "type", event.Type, "rv", rsp.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "name", event.Key.Name, "resource", event.Key.Resource)
	return rsp, nil
}

func (s *server) Update(ctx context.Context, req *resourcepb.UpdateRequest) (*resourcepb.UpdateResponse, error) {
	ctx, span := s.tracer.Start(ctx, "storage_server.Update")
	defer span.End()

	rsp := &resourcepb.UpdateResponse{}
	user, ok := claims.AuthInfoFrom(ctx)
	if !ok || user == nil {
		rsp.Error = &resourcepb.ErrorResult{
			Message: "no user found in context",
			Code:    http.StatusUnauthorized,
		}
		return rsp, nil
	}
	if req.ResourceVersion < 0 {
		rsp.Error = AsErrorResult(apierrors.NewBadRequest("update must include the previous version"))
		return rsp, nil
	}

	var (
		res *resourcepb.UpdateResponse
		err error
	)
	runErr := s.runInQueue(ctx, req.Key.Namespace, func() {
		res, err = s.update(ctx, user, req)
	})
	if runErr != nil {
		return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.UpdateResponse {
			return &resourcepb.UpdateResponse{Error: e}
		})
	}

	return res, err
}

func (s *server) update(ctx context.Context, user claims.AuthInfo, req *resourcepb.UpdateRequest) (*resourcepb.UpdateResponse, error) {
	rsp := &resourcepb.UpdateResponse{}
	latest := s.backend.ReadResource(ctx, &resourcepb.ReadRequest{
		Key: req.Key,
	})
	if latest.Error != nil {
		return rsp, nil
	}
	if latest.Value == nil {
		rsp.Error = NewBadRequestError("current value does not exist")
		return rsp, nil
	}

	if req.ResourceVersion > 0 && latest.ResourceVersion != req.ResourceVersion {
		return nil, ErrOptimisticLockingFailed
	}

	event, e := s.newEvent(ctx, user, req.Key, req.Value, latest.Value)
	if e != nil {
		rsp.Error = e
		return rsp, nil
	}

	event.Type = resourcepb.WatchEvent_MODIFIED
	event.PreviousRV = latest.ResourceVersion

	var err error
	rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event)
	if err != nil {
		rsp.Error = AsErrorResult(err)
	}
	return rsp, nil
}

func (s *server) Delete(ctx context.Context, req *resourcepb.DeleteRequest) (*resourcepb.DeleteResponse, error) {
	ctx, span := s.tracer.Start(ctx, "storage_server.Delete")
	defer span.End()

	rsp := &resourcepb.DeleteResponse{}
	if req.ResourceVersion < 0 {
		return nil, apierrors.NewBadRequest("update must include the previous version")
	}
	user, ok := claims.AuthInfoFrom(ctx)
	if !ok || user == nil {
		rsp.Error = &resourcepb.ErrorResult{
			Message: "no user found in context",
			Code:    http.StatusUnauthorized,
		}
		return rsp, nil
	}

	var (
		res *resourcepb.DeleteResponse
		err error
	)

	runErr := s.runInQueue(ctx, req.Key.Namespace, func() {
		res, err = s.delete(ctx, user, req)
	})
	if runErr != nil {
		return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.DeleteResponse {
			return &resourcepb.DeleteResponse{Error: e}
		})
	}

	return res, err
}

func (s *server) delete(ctx context.Context, user claims.AuthInfo, req *resourcepb.DeleteRequest) (*resourcepb.DeleteResponse, error) {
	rsp := &resourcepb.DeleteResponse{}
	latest := s.backend.ReadResource(ctx, &resourcepb.ReadRequest{
		Key: req.Key,
	})
	if latest.Error != nil {
		rsp.Error = latest.Error
		return rsp, nil
	}
	if req.ResourceVersion > 0 && latest.ResourceVersion != req.ResourceVersion {
		rsp.Error = AsErrorResult(ErrOptimisticLockingFailed)
		return rsp, nil
	}

	access, err := s.access.Check(ctx, user, claims.CheckRequest{
		Verb:      "delete",
		Group:     req.Key.Group,
		Resource:  req.Key.Resource,
		Namespace: req.Key.Namespace,
		Name:      req.Key.Name,
		Folder:    latest.Folder,
	})
	if err != nil {
		rsp.Error = AsErrorResult(err)
		return rsp, nil
	}
	if !access.Allowed {
		rsp.Error = &resourcepb.ErrorResult{
			Code: http.StatusForbidden,
		}
		return rsp, nil
	}

	now := metav1.NewTime(time.UnixMilli(s.now()))
	event := WriteEvent{
		Key:        req.Key,
		Type:       resourcepb.WatchEvent_DELETED,
		PreviousRV: latest.ResourceVersion,
		GUID:       uuid.New().String(),
	}
	marker := &unstructured.Unstructured{}
	err = json.Unmarshal(latest.Value, marker)
	if err != nil {
		return nil, apierrors.NewBadRequest(
			fmt.Sprintf("unable to read previous object, %v", err))
	}
	obj, err := utils.MetaAccessor(marker)
	if err != nil {
		return nil, err
	}
	obj.SetDeletionTimestamp(&now)
	obj.SetUpdatedTimestamp(&now.Time)
	obj.SetManagedFields(nil)
	obj.SetFinalizers(nil)
	obj.SetUpdatedBy(user.GetUID())
	obj.SetGeneration(utils.DeletedGeneration)
	obj.SetAnnotation(utils.AnnoKeyKubectlLastAppliedConfig, "") // clears it
	event.Value, err = marker.MarshalJSON()
	if err != nil {
		return nil, apierrors.NewBadRequest(
			fmt.Sprintf("unable creating deletion marker, %v", err))
	}

	rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event)
	if err != nil {
		rsp.Error = AsErrorResult(err)
	}
	return rsp, nil
}

func (s *server) Read(ctx context.Context, req *resourcepb.ReadRequest) (*resourcepb.ReadResponse, error) {
	user, ok := claims.AuthInfoFrom(ctx)
	if !ok || user == nil {
		return &resourcepb.ReadResponse{
			Error: &resourcepb.ErrorResult{
				Message: "no user found in context",
				Code:    http.StatusUnauthorized,
			}}, nil
	}

	// if req.Key.Group == "" {
	// 	status, _ := AsErrorResult(apierrors.NewBadRequest("missing group"))
	// 	return &ReadResponse{Status: status}, nil
	// }
	if req.Key.Resource == "" {
		return &resourcepb.ReadResponse{Error: NewBadRequestError("missing resource")}, nil
	}

	var (
		res *resourcepb.ReadResponse
		err error
	)
	runErr := s.runInQueue(ctx, req.Key.Namespace, func() {
		res, err = s.read(ctx, user, req)
	})
	if runErr != nil {
		return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.ReadResponse {
			return &resourcepb.ReadResponse{Error: e}
		})
	}

	return res, err
}

func (s *server) read(ctx context.Context, user claims.AuthInfo, req *resourcepb.ReadRequest) (*resourcepb.ReadResponse, error) {
	rsp := s.backend.ReadResource(ctx, req)
	if rsp.Error != nil && rsp.Error.Code == http.StatusNotFound {
		return &resourcepb.ReadResponse{Error: rsp.Error}, nil
	}

	a, err := s.access.Check(ctx, user, claims.CheckRequest{
		Verb:      "get",
		Group:     req.Key.Group,
		Resource:  req.Key.Resource,
		Namespace: req.Key.Namespace,
		Name:      req.Key.Name,
		Folder:    rsp.Folder,
	})
	if err != nil {
		return &resourcepb.ReadResponse{Error: AsErrorResult(err)}, nil
	}
	if !a.Allowed {
		return &resourcepb.ReadResponse{
			Error: &resourcepb.ErrorResult{
				Code: http.StatusForbidden,
			}}, nil
	}
	return &resourcepb.ReadResponse{
		ResourceVersion: rsp.ResourceVersion,
		Value:           rsp.Value,
		Error:           rsp.Error,
	}, nil
}

func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resourcepb.ListResponse, error) {
	ctx, span := s.tracer.Start(ctx, "storage_server.List")
	defer span.End()

	// The history + trash queries do not yet support additional filters
	if req.Source != resourcepb.ListRequest_STORE {
		if len(req.Options.Fields) > 0 || len(req.Options.Labels) > 0 {
			return &resourcepb.ListResponse{
				Error: NewBadRequestError("unexpected field/label selector for history query"),
			}, nil
		}
	}

	user, ok := claims.AuthInfoFrom(ctx)
	if !ok || user == nil {
		return &resourcepb.ListResponse{
			Error: &resourcepb.ErrorResult{
				Message: "no user found in context",
				Code:    http.StatusUnauthorized,
			}}, nil
	}

	// Do not allow label query for trash/history
	for _, v := range req.Options.Labels {
		if v.Key == utils.LabelKeyGetHistory || v.Key == utils.LabelKeyGetTrash {
			return &resourcepb.ListResponse{Error: NewBadRequestError("history and trash must be requested as source")}, nil
		}
	}

	if req.Limit < 1 {
		req.Limit = 50 // default max 50 items in a page
	}
	maxPageBytes := s.maxPageSizeBytes
	pageBytes := 0
	rsp := &resourcepb.ListResponse{}

	key := req.Options.Key
	checker, err := s.access.Compile(ctx, user, claims.ListRequest{
		Group:     key.Group,
		Resource:  key.Resource,
		Namespace: key.Namespace,
		Verb:      utils.VerbGet,
	})
	var trashChecker claims.ItemChecker // only for trash
	if req.Source == resourcepb.ListRequest_TRASH {
		trashChecker, err = s.access.Compile(ctx, user, claims.ListRequest{
			Group:     key.Group,
			Resource:  key.Resource,
			Namespace: key.Namespace,
			Verb:      utils.VerbSetPermissions, // Basically Admin
		})
		if err != nil {
			return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
		}
	}
	if err != nil {
		return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
	}
	if checker == nil {
		return &resourcepb.ListResponse{Error: &resourcepb.ErrorResult{
			Code: http.StatusForbidden,
		}}, nil
	}

	iterFunc := func(iter ListIterator) error {
		for iter.Next() {
			if err := iter.Error(); err != nil {
				return err
			}

			item := &resourcepb.ResourceWrapper{
				ResourceVersion: iter.ResourceVersion(),
				Value:           iter.Value(),
			}
			// Trash is only accessible to admins or the user who deleted the object
			if req.Source == resourcepb.ListRequest_TRASH {
				if !s.isTrashItemAuthorized(ctx, iter, trashChecker) {
					continue
				}
			} else if !checker(iter.Name(), iter.Folder()) {
				continue
			}

			pageBytes += len(item.Value)
			rsp.Items = append(rsp.Items, item)
			if len(rsp.Items) >= int(req.Limit) || pageBytes >= maxPageBytes {
				t := iter.ContinueToken()
				if iter.Next() {
					rsp.NextPageToken = t
				}
				return iter.Error()
			}
		}
		return iter.Error()
	}

	var rv int64
	switch req.Source {
	case resourcepb.ListRequest_STORE:
		rv, err = s.backend.ListIterator(ctx, req, iterFunc)
	case resourcepb.ListRequest_HISTORY, resourcepb.ListRequest_TRASH:
		rv, err = s.backend.ListHistory(ctx, req, iterFunc)
	default:
		return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid list source: %v", req.Source))
	}

	if err != nil {
		rsp.Error = AsErrorResult(err)
		return rsp, nil
	}

	if rv < 1 {
		rsp.Error = &resourcepb.ErrorResult{
			Code:    http.StatusInternalServerError,
			Message: fmt.Sprintf("invalid resource version for list: %v", rv),
		}
		return rsp, nil
	}
	rsp.ResourceVersion = rv
	return rsp, err
}

// isTrashItemAuthorized checks if the user has access to the trash item.
func (s *server) isTrashItemAuthorized(ctx context.Context, iter ListIterator, trashChecker claims.ItemChecker) bool {
	user, ok := claims.AuthInfoFrom(ctx)
	if !ok || user == nil {
		return false
	}

	partial := &metav1.PartialObjectMetadata{}
	err := json.Unmarshal(iter.Value(), partial)
	if err != nil {
		return false
	}

	obj, err := utils.MetaAccessor(partial)
	if err != nil {
		return false
	}

	// Trash is only accessible to admins or the user who deleted the object
	return obj.GetUpdatedBy() == user.GetUID() || trashChecker(iter.Name(), iter.Folder())
}

func (s *server) initWatcher() error {
	var err error
	s.broadcaster, err = NewBroadcaster(s.ctx, func(out chan<- *WrittenEvent) error {
		events, err := s.backend.WatchWriteEvents(s.ctx)
		if err != nil {
			return err
		}
		go func() {
			for v := range events {
				if v == nil {
					s.log.Error("received nil event")
					continue
				}
				// Skip events during batch updates
				if v.PreviousRV < 0 {
					continue
				}

				s.log.Debug("Server. Streaming Event", "type", v.Type, "previousRV", v.PreviousRV, "group", v.Key.Group, "namespace", v.Key.Namespace, "resource", v.Key.Resource, "name", v.Key.Name)
				s.mostRecentRV.Store(v.ResourceVersion)
				out <- v
			}
		}()
		return nil
	})
	return err
}

//nolint:gocyclo
func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStore_WatchServer) error {
	ctx := srv.Context()

	user, ok := claims.AuthInfoFrom(ctx)
	if !ok || user == nil {
		return apierrors.NewUnauthorized("no user found in context")
	}

	key := req.Options.Key
	checker, err := s.access.Compile(ctx, user, claims.ListRequest{
		Group:     key.Group,
		Resource:  key.Resource,
		Namespace: key.Namespace,
		Verb:      utils.VerbGet,
	})
	if err != nil {
		return err
	}
	if checker == nil {
		return apierrors.NewUnauthorized("not allowed to list anything") // ?? or a single error?
	}

	// Start listening -- this will buffer any changes that happen while we backfill.
	// If events are generated faster than we can process them, then some events will be dropped.
	// TODO: Think of a way to allow the client to catch up.
	stream, err := s.broadcaster.Subscribe(ctx)
	if err != nil {
		return err
	}
	defer s.broadcaster.Unsubscribe(stream)

	// Determine a safe starting resource-version for the watch.
	// When the client requests SendInitialEvents we will use the resource-version
	// of the last object returned from the initial list (handled below).
	// When the client supplies an explicit `since` we honour that.
	// In the remaining case (SendInitialEvents == false && since == 0) we need
	// a high-water-mark representing the current state of storage so that we
	// donʼt replay events that happened before the watch was established. Using
	// `mostRecentRV` – which is updated asynchronously by the broadcaster – is
	// subject to races because the broadcaster may not yet have observed the
	// latest committed writes. Instead we ask the backend directly for the
	// current resource-version.
	var mostRecentRV int64
	if !req.SendInitialEvents && req.Since == 0 {
		// We only need the current RV. A cheap way to obtain it is to issue a
		// List with a very small limit and read the listRV returned by the
		// iterator. The callback is a no-op so we avoid materialising any
		// items.
		listReq := &resourcepb.ListRequest{
			Options: req.Options,
			// This has right now no effect, as the list request only uses the limit if it lists from history or trash.
			// It might be worth adding it in a subsequent PR. We only list once during setup of the watch, so it's
			// fine for now.
			Limit: 1,
		}

		rv, err := s.backend.ListIterator(ctx, listReq, func(ListIterator) error { return nil })
		if err != nil {
			// Fallback to the broadcasterʼs view if the backend lookup fails.
			// This preserves previous behaviour while still eliminating the
			// common race in the majority of cases.
			s.log.Warn("watch: failed to fetch current RV from backend, falling back to broadcaster", "err", err)
			mostRecentRV = s.mostRecentRV.Load()
		} else {
			mostRecentRV = rv
		}
	} else {
		// For all other code-paths we either already have an explicit RV or we
		// will derive it from the initial list below.
		mostRecentRV = s.mostRecentRV.Load()
	}

	var initialEventsRV int64 // resource version coming from the initial events
	if req.SendInitialEvents {
		// Backfill the stream by adding every existing entities.
		initialEventsRV, err = s.backend.ListIterator(ctx, &resourcepb.ListRequest{Options: req.Options}, func(iter ListIterator) error {
			for iter.Next() {
				if err := iter.Error(); err != nil {
					return err
				}
				if err := srv.Send(&resourcepb.WatchEvent{
					Type: resourcepb.WatchEvent_ADDED,
					Resource: &resourcepb.WatchEvent_Resource{
						Value:   iter.Value(),
						Version: iter.ResourceVersion(),
					},
				}); err != nil {
					return err
				}
			}
			return iter.Error()
		})
		if err != nil {
			return err
		}
	}
	if req.SendInitialEvents && req.AllowWatchBookmarks {
		if err := srv.Send(&resourcepb.WatchEvent{
			Type: resourcepb.WatchEvent_BOOKMARK,
			Resource: &resourcepb.WatchEvent_Resource{
				Version: initialEventsRV,
			},
		}); err != nil {
			return err
		}
	}

	var since int64 // resource version to start watching from
	switch {
	case req.SendInitialEvents:
		since = initialEventsRV
	case req.Since == 0:
		since = mostRecentRV
	default:
		since = req.Since
	}
	for {
		select {
		case <-ctx.Done():
			return nil

		case event, ok := <-stream:
			if !ok {
				s.log.Debug("watch events closed")
				return nil
			}
			s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
			if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
				if !checker(event.Key.Name, event.Folder) {
					continue
				}

				value := event.Value
				// remove the delete marker stored in the value for deleted objects
				if event.Type == resourcepb.WatchEvent_DELETED {
					value = []byte{}
				}
				resp := &resourcepb.WatchEvent{
					Timestamp: event.Timestamp,
					Type:      event.Type,
					Resource: &resourcepb.WatchEvent_Resource{
						Value:   value,
						Version: event.ResourceVersion,
					},
				}
				if event.PreviousRV > 0 {
					prevObj, err := s.Read(ctx, &resourcepb.ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV})
					if err != nil {
						// This scenario should never happen, but if it does, we should log it and continue
						// sending the event without the previous object. The client will decide what to do.
						s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error)
					} else {
						if prevObj.ResourceVersion != event.PreviousRV {
							s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion)
							return fmt.Errorf("resource version mismatch")
						}
						resp.Previous = &resourcepb.WatchEvent_Resource{
							Value:   prevObj.Value,
							Version: prevObj.ResourceVersion,
						}
					}
				}
				if err := srv.Send(resp); err != nil {
					return err
				}

				if s.storageMetrics != nil {
					// record latency - resource version is a unix timestamp in microseconds so we convert to seconds
					latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6
					if latencySeconds > 0 {
						s.storageMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds)
					}
				}
			}
		}
	}
}

func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
	if s.search == nil {
		return nil, fmt.Errorf("search index not configured")
	}

	return s.search.Search(ctx, req)
}

// GetStats implements ResourceServer.
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
	if err := s.Init(ctx); err != nil {
		return nil, err
	}

	if s.search == nil {
		// If the backend implements "GetStats", we can use it
		srv, ok := s.backend.(resourcepb.ResourceIndexServer)
		if ok {
			return srv.GetStats(ctx, req)
		}
		return nil, fmt.Errorf("search index not configured")
	}
	return s.search.GetStats(ctx, req)
}

func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
	return s.search.ListManagedObjects(ctx, req)
}

func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
	return s.search.CountManagedObjects(ctx, req)
}

// IsHealthy implements ResourceServer.
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
	return s.diagnostics.IsHealthy(ctx, req)
}

// GetBlob implements BlobStore.
func (s *server) PutBlob(ctx context.Context, req *resourcepb.PutBlobRequest) (*resourcepb.PutBlobResponse, error) {
	if s.blob == nil {
		return &resourcepb.PutBlobResponse{Error: &resourcepb.ErrorResult{
			Message: "blob store not configured",
			Code:    http.StatusNotImplemented,
		}}, nil
	}

	rsp, err := s.blob.PutResourceBlob(ctx, req)
	if err != nil {
		rsp.Error = AsErrorResult(err)
	}
	return rsp, nil
}

func (s *server) getPartialObject(ctx context.Context, key *resourcepb.ResourceKey, rv int64) (utils.GrafanaMetaAccessor, *resourcepb.ErrorResult) {
	if r := verifyRequestKey(key); r != nil {
		return nil, r
	}

	rsp := s.backend.ReadResource(ctx, &resourcepb.ReadRequest{
		Key:             key,
		ResourceVersion: rv,
	})
	if rsp.Error != nil {
		return nil, rsp.Error
	}

	partial := &metav1.PartialObjectMetadata{}
	err := json.Unmarshal(rsp.Value, partial)
	if err != nil {
		return nil, AsErrorResult(err)
	}
	obj, err := utils.MetaAccessor(partial)
	if err != nil {
		return nil, AsErrorResult(err)
	}
	return obj, nil
}

// GetBlob implements BlobStore.
func (s *server) GetBlob(ctx context.Context, req *resourcepb.GetBlobRequest) (*resourcepb.GetBlobResponse, error) {
	if s.blob == nil {
		return &resourcepb.GetBlobResponse{Error: &resourcepb.ErrorResult{
			Message: "blob store not configured",
			Code:    http.StatusNotImplemented,
		}}, nil
	}

	var info *utils.BlobInfo
	if req.Uid == "" {
		// The linked blob is stored in the resource metadata attributes
		obj, status := s.getPartialObject(ctx, req.Resource, req.ResourceVersion)
		if status != nil {
			return &resourcepb.GetBlobResponse{Error: status}, nil
		}

		info = obj.GetBlob()
		if info == nil || info.UID == "" {
			return &resourcepb.GetBlobResponse{Error: &resourcepb.ErrorResult{
				Message: "Resource does not have a linked blob",
				Code:    404,
			}}, nil
		}
	} else {
		info = &utils.BlobInfo{UID: req.Uid}
	}

	rsp, err := s.blob.GetResourceBlob(ctx, req.Resource, info, req.MustProxyBytes)
	if err != nil {
		rsp.Error = AsErrorResult(err)
	}
	return rsp, nil
}

func (s *server) runInQueue(ctx context.Context, tenantID string, runnable func()) error {
	boff := backoff.New(ctx, backoff.Config{
		MinBackoff: DefaultMinBackoff,
		MaxBackoff: DefaultMaxBackoff,
		MaxRetries: DefaultMaxRetries,
	})

	var (
		wg  sync.WaitGroup
		err error
	)
	wg.Add(1)
	wrapped := func() {
		defer wg.Done()
		runnable()
	}
	for boff.Ongoing() {
		err = s.queue.Enqueue(ctx, tenantID, wrapped)
		if err == nil {
			break
		}
		s.log.Warn("failed to enqueue runnable, retrying",
			"maxRetries", DefaultMaxRetries,
			"tenantID", tenantID,
			"error", err)
		boff.Wait()
	}
	if err != nil {
		s.log.Error("failed to enqueue runnable",
			"maxRetries", DefaultMaxRetries,
			"tenantID", tenantID,
			"error", err)
		return fmt.Errorf("failed to enqueue runnable for tenant %s: %w", tenantID, err)
	}
	wg.Wait()
	return nil
}
